package com.bootdo.rabbitMQ.direct;


import com.alibaba.fastjson.JSONObject;
import com.bootdo.rabbitMQ.entity.MessageVo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;


/**
 * @Author wukq
 * @Date: 2021/4/10 8:52
 * @Description: 开启手动确认
 * 自动确认和手动确认的区别？
 * 消费者接受消费消息的时候：如果是自动ack，意思就是接受了一个消息，自动向消息中间件确认了这个消息被消费了，
 * 而现实中就有可能消费者并没有正确消费，或者后续处理有异常导致此消息消费失败！
 * 而手动ack，就在你所有逻辑代码执行后，在回复ack给消息中间件，这样就保证了消息的正确消费。
 * <p>
 * 还有就是性能问题，我觉得是你多虑了，好的系统是优化出来的。不是设计出来的，你正常手动。有性能瓶颈再说如何去解决。
 * 而且不是所有消息都是需要手动的，要具体看应用场景。
 * <p>
 * 手动的情况可以设置最大未消费条数,同时开个线程池处理,速度很快的.不建议自动,不然要是程序出问题丢数据的情况有你哭的
 */
@Component
@RabbitListener(queues = DirectConstant.QUEUE_DIRECT)//监听队列
public class DirectConsumer {

  @Autowired
  private CachingConnectionFactory connectionFactory;

  /**
   * No method found for class java.lang.String ,参数上必须加上String s
   * Listener method 'no match' threw exception,存储的时候是String，读取的时候也要是String
   * MessageVo messageVo = new MessageVo();
   * messageVo.setMsg(s);//存储的是string,所以读取的时候也要是String
   * s {"msg":"[{\"content\":\"放假也要写作业\",\"name\":\"小明\",\"title\":\"放假了\"},{\"content\":\"去襄阳旅游吃牛肉面，看古城\",\"name\":\"小王\",\"title\":\"去旅游\"},{\"content\":\"九宫山风景真不错啊\",\"name\":\"小李\",\"title\":\"去爬山\"}]"}
   */
  @RabbitHandler
  public void process(String s, Message message, Channel channel) throws Exception {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setMaxConcurrentConsumers(10);
    factory.setPrefetchCount(1);
    factory.setDefaultRequeueRejected(true);
    //手动确认
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    byte[] body = message.getBody();
    String bodyString = new String(body);

    MessageVo messageVo = JSONObject.parseObject(bodyString, MessageVo.class);
    System.out.println("-----------------messageVO" + messageVo.getMsg());
    String msg = messageVo.getMsg();
    List<DirectNews> directNews = JSONObject.parseArray(msg, DirectNews.class);
    for (DirectNews news : directNews) {
      System.out.println("-----接受到的消息----" + news.toString());
    }


    try {
      /**
       * 消费确认用来确保消费者是否成功的消费了消息。一旦有消费者成功注册到相应的消息服务，消息将会被消息服务通过basic.deliver推（push）给消费者，
       * 此时消息会包含一个deliver tag用来唯一的标识消息。如果此时是手动模式，就需要手动的确认消息已经被成功消费，否则消息服务将会重发消息
       * （因为消息已经持久化到了硬盘上，所以无论消息服务是不是可能挂掉，都会重发消息）。而且必须确认，无论是成功或者失败，否则会引起非常严重的问题
       * 发现扇形交换机56条消息消费不了。很严重的问题
       *
       *basicNack方法需要传递三个参数
       *  deliveryTag（唯一标识 ID）：上面已经解释了。
       *  multiple：如果设置为true，一条消息应答了，那么之前的全部消息将被应答。比如目前channel中有delivery tags为5,6,7,8的消息，
       *  那么一旦8被应答，那么5，6，7将都被应答，如果设置为false，那么5，6，7将不会被应答。（不建议设置，毕竟一个channel中会绑定好多consumer）
       *  requeue： true ：重回队列，false ：丢弃，我们在nack方法中必须设置 false，否则重发没有意义。
       *  try里面出现异常会回调到returnedMessage方法
       *
       * */
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//消费消息确认
      Thread.sleep(1000);
    } catch (IOException e) {
      e.printStackTrace();
      /**
       * basicReject方法需要传递两个参数
       *
       *     deliveryTag（唯一标识 ID）：上面已经解释了。
       *     requeue：重回队列，在reject方法里必须设置true。
       *     还要说明一下，建议大家不要重发，重发后基本还是失败，因为出现问题一般都是异常导致的，出现异常的话，我的观点是丢弃这个消息，然后在catch里做补偿操作。
       * */
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

      //捕获异常后，重新发送到指定队列，自动ack不抛出异常即为ack
//      channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
//              msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
//              msg.getBody());

    }


  }
}
